WorkflowsでCloud Functionsを並列実行してみた
日比谷オフィスの根本です。業務でGoogle CloudのWorkflowsで並列実行処理ができるかどうかを検証してみたのでその際の手順を記事にしました。
Workflowsとは
Google CloudのWorkflowsは、サーバレスのジョブ自動実行サービスです。
YAMLまたはJSON形式で処理を書くことができ、Google Cloudの他のサービスを呼び出すコネクタも用意されているので多くのサービスと連携させることができます。また、YAMLまたはJSONで書いた処理はWorkflowsの管理画面で可視化されるので実装したワークフローを視覚的に把握することもできます。
Workfowsで処理の並列実行ができるということを検証する機会がありましたので、一連の検証結果を記事にしました。
検証の全体像
Workflowsの並列実行を検証するにあたり、以下の処理を実装して検証をしてみました。
- Cloud Storageのバケットに保存されているファイルの一覧をCloud Functionsで取得
- ファイルの数だけCloud Functionsを並列実行
検証するために実装したCloud Functions関数は以下となります。
- ファイル一覧取得関数:ファイル名の一覧を取得してHTTPレスポンスで返却
- 並列実行確認関数:ファイル名を引数に受け取り、ログに出力
それぞれ準備をして検証していきます。
準備①:Cloud Storageへのファイルのアップロード
バケットを作成します。作成した関数と同じリージョンに作成します。ここで作成したバケット名はCloud Functions内で用います。
gsutil mb -c standard -l asia-northeast1 gs://[バケット名]
touchコマンドでファイルを作成します。
touch test1.txt touch test2.txt touch test3.txt
作成したバケットへ、gsutil cpコマンドでファイルをアップロードします。
gsutil cp ./test* gs://[バケット名]
準備②:Cloud Functionsの実装(ファイル一覧取得関数)
フォルダ構造は以下となります
parallel_process_function/ ├requirements.txt └main.py
まずはバケット内のファイル一覧取得関数のプログラムを作成します。
import functions_framework from google.cloud import storage from flask import jsonify def list_blobs(bucket_name): storage_client = storage.Client() blobs = storage_client.list_blobs(bucket_name) blob_names = [blob.name for blob in blobs] return blob_names @functions_framework.http def http_get_filename_list(request): BUCKET = "準備①で作成したバケット名" list_blobs_result = list_blobs(BUCKET) return jsonify({"file_names": list_blobs_result})
変数:BUCKETの値を、手順①で作成したバケット名にします。
Cloud FunctionsのトリガーはHTTPトリガーとします。
return文でjsonifyを用いている理由としては、jsonifyはHTTPヘッダーのContent-Typeをapplication/jsonにしてくれるからで、後続のWorkflowsの処理に都合が良いからです。
requirements.txtはfunctions-framework以外にCloud Storageへの操作もあるのでgoogle-cloud-storageも含めます。
functions-framework==3.* google-cloud-storage>=1.36.1
デプロイは以下のgcloudコマンドで行います。 (検証用なのでサービスアカウントやメモリ設定はデフォルト設定のままとします)
gcloud functions deploy get_filename_list_function \ --gen2 \ --region=asia-northeast1 \ --runtime=python311 \ --source=. \ --entry-point=http_get_filename_list \ --trigger-http
gcloudコマンドを、main.pyとrequirement.txtの保存してあるパスをカレントディレクトリにして(parallel_process_functionディレクトリ直下)実行します。
Allow unauthenticated invocations of new function [parallel_process_function]? (y/N)? y
認証無し(誰からでもHTTPリクエストを受け付ける)で立ち上げて良いか確認されます。 今回は検証用ですぐ削除する関数なので[y]を押下して認証なしで作成をします。
準備③:Cloud Functionsの実装(並列実行確認関数)
フォルダ構造は以下となります
get_file_names_function/ ├requirements.txt └main.py
並列実行されるCloud Functionsの関数も準備します。
処理としては、HTTPリクエストオブジェクトからファイル名を取得して動作確認用にファイル名をprintする処理となります。
また、Cloud Functionsが並列起動されていることを確認しやすくするためにtime.sleep関数で5秒間の待機処理も実装しています。Cloud Functionsの関数自体の処理が簡素なためミリ秒で処理が終わってしまうのでsleep関数で待機することでCloud Functionsが並列で呼び出されていることを確認できるようにします。
import functions_framework import time @functions_framework.http def parallel_process(request): data = request.get_json() file_name = data.get('file_name') print(f"[{file_name}]:関数が起動されました。対象ファイルは[{file_name}]です。") time.sleep(5) print(f"[{file_name}]:処理終了") return 'OK'
requirements.txtはfunctions-frameworkのみとなります
functions-framework==3.*
こちらもgcloudコマンドでデプロイします。 gcloudコマンドを、main.pyとrequirement.txtの保存してあるパスで実行します。
gcloud functions deploy parallel_process_function \ --gen2 \ --region=asia-northeast1 \ --runtime=python311 \ --source=. \ --concurrency=3 \ --memory=2GiB \ --entry-point=parallel_process \ --trigger-http
こちらの関数は同時実行数を3にしています。 同時実行数を1より大きくするためにメモリも2GiBとしています。(Cloud Functionsの同時実行数を1より多くするにはvCPUを1以上にする必要があるため。メモリを2GiBとするとvCPU数が1になります) トリガーはこちらもHTTPトリガーとなります。こちらもコマンド実行時に 認証無し(誰からでもHTTPリクエストを受け付ける)で立ち上げて良いか確認されます。 こちらも検証用ですぐ削除する関数なので[y]を押下して認証なしで作成をします。
準備④:Workflowsの実装
Workflowsの処理全体の流れ
- バケット内のファイル名一覧をCloud Functionsで取得(ファイル一覧取得関数起動)
- ファイル一覧取得関数のレスポンスを受け取り、ファイルの数だけ並列でCloud Functionsを起動(並列実行確認関数)
- get_file_names: call: http.get args: url: "ファイル一覧取得関数のURL" result: getFileNameResult - processFiles: parallel: for: value: name in: ${getFileNameResult.body.file_names} steps: - processFile: call: http.post args: url: "並列実行確認関数のURL" body: file_name: ${name} result: processResult
parallelステップ内に記載した処理が並行実行されます。また、parallelステップ内にFORループを実装すると、FORループ内の処理が並行実行されます。今回はFORループ内で並列実行確認関数の呼び出しを行うことで、Cloud Functionsの並列実行を行います。
前準備として、get_file_namesのステップで[ファイル一覧取得関数]を呼び出して、ファイル名一覧を取得します。結果(ファイル一覧取得関数のレスポンス)を[getFileNameResult]変数に持ち、後続のprocessFilesステップでファイル名を取り出します。
[getFileNameResult]に入っているのはファイル一覧取得関数のレスポンスボディです。
Workflowsはレスポンスの Content-Type
ヘッダーで application/json
メディアタイプが指定されている場合、decode処理が不要で${変数名.body.JSONのキー}で変数にアクセスすることができます。
よって以下のステップでファイル名をFOR文の値に渡すことができます。
in: ${getFileNameResult.body.file_names}
そしてファイル名をリクエストボディに格納してファイル名ごとに並列実行確認関数を呼び出します。
call: http.post args: url: "並列実行確認関数のURL" body: file_name: ${name}
デプロイは以下のgcloudコマンドで行います
gcloud workflows deploy dev_workflow \ --location=asia-northeast1 \ --source=workflow.yaml
結果確認
Workflowsのデプロイに成功するとGoogle CloudのWorkflowsのサービス画面上で確認することができます。
ワークフローの詳細>ソース の画面に遷移をすると、デプロイしたワークフローを視覚的に確認することができます
ワークフローを実行してみます。 ワークフローの詳細の右側、「実行」ボタンを押下するとワークフローが実行されます。
実行時に引数を渡すことができますが、今回は何も設定しないのでそのまま実行します。
実行に成功すると可視化されているワークフローのそれぞれのステップが緑色になります。
並列実行確認関数のログを確認して、並列実行されているか確認をします。 以下は並列実行確認関数のログです(一部マスク処理しています)
赤枠の関数の起動時刻と、それぞれの関数の終了時刻(青枠)を見るとほぼ同時刻に起動されて同時刻に終了しているのがわかります。 視覚的にすると以下のイメージとなります。
上記の結果から、Workflowsのparallelステップを用いることでCloud Functionsの並列実行をWorkflowsから行うことができました。 並列実行ができることで、ワークフロー全体の処理時間短縮を図ることができると思います。
まとめ
Workflowsは処理フローを視覚的に見ることもでき、処理の実装もYAML・JSON形式で実装できるので手軽だと感じました。並列処理を組むことも手軽にでき、また今回は使用していませんが他のGoogle Cloudサービスとのコネクタも豊富に用意されているようで活用の幅はとても広いのではないかなと考えます。